For task #4 I decided to figure out what is the most popular sandwich. I had from the previous task a set of dishes from 'American (New)' category. In additional I decided to not stop only o sandwich but try to find most closes dishes to sandwiches.
In [1]:
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf, col, lit
from pyspark.sql.types import BooleanType
basePath = 'dataminingcapstone-001'
workingDir = os.path.join(os.curdir, basePath)
In [2]:
sqlContext = SQLContext(sc)
targetDir = os.path.join(workingDir, 'yelp_dataset_challenge_academic_dataset')
businessJSON = os.path.join(targetDir, 'yelp_academic_dataset_business.json')
businessDF = sqlContext.read.json(businessJSON)
reviewsJSON = os.path.join(targetDir, 'yelp_academic_dataset_review.json')
reviewsDF = sqlContext.read.json(reviewsJSON)
contains = udf(lambda xs, val: val in xs, BooleanType())
restaurantsDF = businessDF[contains(businessDF.categories, lit(u'American (New)'))]
selectedReviewsDF = reviewsDF.join(restaurantsDF,\
restaurantsDF.business_id == reviewsDF.business_id)
rewiewsRDD = selectedReviewsDF.select("text").map(lambda x: x.text)
In [3]:
import string
remove_punctuation_map = dict((ord(char), None) for char in string.punctuation)
wordsRDD = rewiewsRDD.map(lambda x: x.translate(remove_punctuation_map).lower()).map(lambda row: row.split(" "))
In [4]:
from pyspark.mllib.feature import Word2Vec
word2vec = Word2Vec()
model = word2vec.fit(wordsRDD)
In [5]:
print 'synonyms for word: \'{}\''.format('sandwich')
synonyms = model.findSynonyms('sandwich', 5)
for synonym, cosine_distance in synonyms:
print("\t{}: {}".format(synonym.encode('utf-8'), cosine_distance))
In [6]:
core_dish = [x[0] for x in synonyms]
core_dish.append(u'sandwich')
print core_dish
In [7]:
dish_file_path = os.path.join(workingDir, 'dishes.txt')
dish_names = (sc.textFile(dish_file_path)
.filter(lambda dish_name:
any(word in dish_name for word in core_dish))
.collect())
print "length of dish names: {}".format(len(dish_names))
print dish_names
In [10]:
from nltk.stem import WordNetLemmatizer
wnl = WordNetLemmatizer()
def lemmatize_phrase(phrase):
return " ".join([wnl.lemmatize(w) for w in phrase.split()])
In [12]:
normalized_dish_names = {lemmatize_phrase(w) for w in dish_names}
def contains_count(name):
count = 0
for w in normalized_dish_names:
if name in w:
count += 1
return count
unique_dish_names = [w for w in normalized_dish_names if contains_count(w) == 1]
print "length of dish names: {}".format(len(unique_dish_names))
print unique_dish_names
In [45]:
remove_punctuation_map = dict((ord(char), None) for char in string.punctuation)
focusedReviewsRDD = rewiewsRDD.filter(lambda review: any(word in review for word in core_dish))
preprocessedReviewRDD = (focusedReviewsRDD
.map(lambda x: x.translate(remove_punctuation_map))
.map(lambda review: lemmatize_phrase(review))
)
preprocessedReviewRDD.cache()
Out[45]:
In [46]:
print preprocessedReviewRDD.count()
In [47]:
def filter_by_dish_name(dish_name):
return preprocessedReviewRDD.filter(lambda review: dish_name in review)
dish_names_occurrence = {name: filter_by_dish_name(name) for name in unique_dish_names}
In [48]:
dish_names_occurrence_count = [(name, rdd.count()) for name, rdd in dish_names_occurrence.items()]
non_empty_occurrence_count = [(name, count) for name, count in dish_names_occurrence_count if count != 0]
print non_empty_occurrence_count
In [88]:
print len(non_empty_occurrence_count)
In [49]:
import operator
sorted_names = sorted(non_empty_occurrence_count, key=operator.itemgetter(1), reverse=True)[:80]
dishes_rdd = [(name, dish_names_occurrence[name]) for name, count in sorted_names]
In [183]:
middle_ticks = [2, 3, 4]
ticks = [1] + middle_ticks + [12]
print ticks
In [238]:
%matplotlib inline
import operator
import numpy as np
import matplotlib.pyplot as plt
def showBar(data, title, ylabel, xlabel, labels_y = 3, labels = None, color_points = None, middle_ticks = None):
length = len(data)
ind = np.arange(length)
width = 0.5
counts = [c for (names, c) in data]
if color_points:
min_color_points = min(color_points)
max_color_points = max(color_points)
length_color_points = max_color_points - min_color_points
norm_color_points = [(color_point - min_color_points) / length_color_points for color_point in color_points]
color_map = plt.get_cmap('coolwarm')
colors = np.array([color_map(color_point) for color_point in norm_color_points])
heatmap = plt.scatter(ind, ind, c = color_points, cmap = 'coolwarm')
plt.clf()
fig, ax = plt.subplots()
bar = ax.bar(ind, counts, color=colors)
if middle_ticks:
ticks = [min_color_points] + middle_ticks + [max_color_points]
else:
ticks = None
cbar = plt.colorbar(heatmap, ticks = ticks)
else:
fig, ax = plt.subplots()
bar = ax.bar(ind, counts, color='r')
ax.set_title(title)
ax.set_ylabel(ylabel)
ax.set_xlabel(xlabel)
plt.xticks(ind + width, [name for (name, c) in data], rotation='vertical')
if not labels:
labels = counts
def autolabel(rects):
idx = 0
for rect in rects:
height = rect.get_height()
ax.text(rect.get_x() + 0.8, height + labels_y, str(labels[idx]), ha='center', va='bottom', rotation='vertical')
idx += 1
autolabel(bar)
fig.set_size_inches(18.5, 10.5)
fig.savefig('reviews_count_per_dish.png', dpi=200)
plt.show()
In [185]:
showBar(sorted_names,
'Count of occurrences in reviews of each dish names',
'Count of occurrences',
'Dish name')
In [68]:
from sklearn.feature_extraction.text import TfidfVectorizer
vectorizer = TfidfVectorizer(min_df=2, stop_words='english')
vectorizer.fit_transform(preprocessedReviewRDD.toLocalIterator())
features = vectorizer.get_feature_names()
In [77]:
from sklearn.feature_extraction.text import CountVectorizer
cv = CountVectorizer(stop_words='english')
cv.fit_transform(preprocessedReviewRDD.toLocalIterator())
cv_features = vectorizer.get_feature_names()
In [78]:
def cv_feature_extractor(words):
v = cv.transform([" ".join(words)]).toarray()[0]
return {cv_features[i]: v[i] for i in range(0, len(v))}
In [69]:
def tfidf_feature_extractor(words):
v = vectorizer.transform([" ".join(words)]).toarray()[0]
return {features[i]: v[i] for i in range(0, len(v))}
In [82]:
from textblob import Blobber
from textblob.en.sentiments import NaiveBayesAnalyzer
naiveBayesAnalyzer = NaiveBayesAnalyzer()#feature_extractor = cv_feature_extractor)
naiveBayesAnalyzer.train()
blobber = Blobber(analyzer = naiveBayesAnalyzer)
In [83]:
def eveluate_polarity(review):
if not review.strip():
return 0.0
blob = blobber(review)
return blob.sentiment.p_pos - blob.sentiment.p_neg
def eveluate_mean_polarity(name, rdd):
m = rdd.map(lambda x: eveluate_polarity(x)).mean()
print "\'{}\' polarity mean is: {}".format(name, m)
return m
In [84]:
dish_names_polarity = [(name, eveluate_mean_polarity(name, rdd)) for name, rdd in dishes_rdd]
In [186]:
polarities = [polarity for name, polarity in dish_names_polarity]
showBar(sorted_names,
'Count of occurrences in reviews of each dish names',
'Count of occurrences',
'Dish name', color_points = polarities, middle_ticks = [0.0, 0.15, 0.3, 0.45, 0.6, 0.75, 0.9])
I tried to combine two characteristic of each dish: sentiment score and number of reviews. Indeed, what we can recommend is something what make us satisfied, in our case we can say sentiment score plays this role. But we we are all different and we don't want to try something 'risky'. In our case the number of reviews show how confident we can be relying on reviews. Thus we need to penalty dishes with lower number of reviews, I decided to decrease it in proportion of max number of reviews, this leads that 'temperature' of scores comes down if dish has just a few reviews.
In [190]:
import math
max_reviews = float(sorted_names[0][1])
def calculate_satisfaction(idx):
factor = max_reviews / sorted_names[idx][1]
penalty = 1.0 + math.log(factor)
return (dish_names_polarity[i][1] / penalty) * 100.0
enhanced_polarity = [ (sorted_names[i][0], calculate_satisfaction(j)) for i in range(0, 80)]
sorted_enhanced_polarity = sorted(enhanced_polarity, key=operator.itemgetter(1), reverse=True)
print sorted_enhanced_polarity
In [192]:
color_points = [val for name, val in sorted_enhanced_polarity]
labels = [round(val, 2) for name, val in sorted_enhanced_polarity]
showBar(sorted_enhanced_polarity,
'Combined satisfaction score for each dish',
'Combined satisfaction score',
'Dish name', labels = labels, labels_y = 0.5, color_points = color_points)
My idea for task 5 was to find the best place to try sandwiches. I define for that the target name.
In [193]:
target_dish_name = 'sandwich'
I run Spark SQL search for all businesses from 'American (New)' category which have in their reviews a word 'sandwich'.
In [194]:
like_argument = "%{}%".format(target_dish_name)
targetBusinessIds = (reviewsDF
.filter(reviewsDF['text'].like(like_argument))
.join(restaurantsDF, restaurantsDF['business_id'] == reviewsDF['business_id'])
.select(reviewsDF['business_id'])
.distinct()
.map(lambda x: x.business_id)
.collect()
)
In [195]:
print len(targetBusinessIds)
It gave me 727 restaurants. After that I collected all reviews written for those restaurants to analyze.
In [206]:
reviewsDF.printSchema()
In [207]:
targetRestaurantsDF = restaurantsDF.filter(restaurantsDF['business_id'].inSet(targetBusinessIds))
targetReviewsDF = (reviewsDF
.join(targetRestaurantsDF, restaurantsDF['business_id'] == reviewsDF['business_id'])
.select(restaurantsDF['business_id'], restaurantsDF['name'],\
reviewsDF['review_id'], reviewsDF['stars'])
)
In [208]:
targetReviewsDF.count()
Out[208]:
Then I started from simple task, namely, to count how many reviews has each restaurant from the selected dataset. I sorted them by count of reviews and select only first 80.
In [241]:
import pyspark.sql.functions as F
sorted_restaurants_with_stars = (targetReviewsDF.groupBy(restaurantsDF['business_id'], restaurantsDF['name'])
.agg(F.count('review_id'), F.avg('stars'))
.sort(col('COUNT(review_id)').desc())
.map(lambda x: (x[0], x[1], x[2], x[3]))
).take(80)
In [242]:
sorted_restaurants_name = [(name, count) for business_id, name, count, stars in sorted_restaurants_with_stars]
showBar(sorted_restaurants_name,
'Count of reviews per each restaurant',
'Count of reviews',
'Restaurant name', labels_y = 30)
In [243]:
average_stars = [stars for business_id, name, count, stars in sorted_restaurants_with_stars]
showBar(sorted_restaurants_name,
'Count of reviews per each restaurant',
'Count of reviews',
'Restaurant name', labels_y = 30, color_points = average_stars,\
middle_ticks = [2.75, 3.00, 3.25, 3.50, 3.75, 4.0, 4.25])
Because of really big number of reviews I focused only on top five restaurants from our list with highest average rating.
In [334]:
shortList = sorted(sorted_restaurants_with_stars, key=operator.itemgetter(3), reverse=True)[:10]
for x in shortList:
print "\'{}\' number of reviews: {}\t average rating: {}".format(x[1], x[2], x[3])
In [252]:
def getIteratorOverAllReivews(business_id):
return (reviewsDF
.filter(reviewsDF['business_id'] == business_id)
.select(reviewsDF['text'])
.map(lambda x: x.text)
)
In [332]:
shortListRestaurantsReviews = [(business_id, name, getIteratorOverAllReivews(business_id))\
for business_id, name, count, stars in shortList]
print shortListRestaurantsReviews
In [270]:
reviewsDir = os.path.join(workingDir, 'reviews')
def writeReviews(business_id, name, rdd):
reviewsFile = os.path.join(reviewsDir, '{}.txt'.format(business_id))
reviewsIt = rdd.toLocalIterator()
with open(reviewsFile, 'w') as f:
for line in reviewsIt:
line = ' '.join(line.encode('utf-8').split())
f.write(line)
f.write('\n')
In [333]:
for business_id, name, rdd in shortListRestaurantsReviews[]:
writeReviews(business_id, name, rdd)
In [341]:
def evaluateSentimentsAverage(business_id):
reviewsFile = os.path.join(reviewsDir, '{}.out'.format(business_id))
return (sc.textFile(reviewsFile)
.map(lambda x: float(x[-20:].split()[-2]))
).mean()
In [342]:
setiments_per_restaurants = [(name, evaluateSentimentsAverage(business_id))\
for business_id, name, rdd in shortListRestaurantsReviews]
In [343]:
print setiments_per_restaurants
In [344]:
%matplotlib inline
import operator
import numpy as np
import matplotlib.pyplot as plt
length = len(setiments_per_restaurants)
ind = np.arange(length)
width = 0.5
counts = [c for names, c in setiments_per_restaurants]
fig, ax = plt.subplots()
bar = ax.bar(ind, counts, color='r')
ax.set_title('Average sentiments score for restaurant from shortlist')
ax.set_ylabel('Average sentiments score')
ax.set_xlabel('Restaurant name')
plt.xticks(ind + width, [name for (name, c) in setiments_per_restaurants], rotation='vertical')
plt.yticks([0.0, 1.0, 2.0], ['Very_Negative', 'Negative', 'Neutral'])
fig.savefig('short_list_sentiment.png', dpi=200)
plt.show()
In [351]:
restaurantsDF.filter(col("business_id") == "rZbHg4ACfN3iShdsT47WKQ").select('full_address').collect()
Out[351]:
In [ ]: